/*!

\page so_5_5__in_depth_message_delivery_filters so-5.5 In Depth - Message Delivery Filters

There are situations when an agent must filter the messages received. A good
example is receiving run-time monitoring information from SObjectizer
Environment (see \ref so_5_5__in_depth_runtime_monitoring): there are lot of
messages and an agent have to handle only few of them:

\code
void timer_thread_stats_listener::evt_timer_quantities(
    const so_5::rt::stats::messages::quantity< std::size_t > & evt )
{
    // Ignore messages unrelated to timer thread.
    if( so_5::rt::stats::prefixes::timer_thread() != evt.m_prefix )
        return;
    ... // Processing of related to timer thread messages.
}
\endcode

This kind of filtering of inappropriate messages is not efficient and has
significant run-time cost. Every message the agent subscribed for is passed to
agent's queue, then enqueued, then passed to agent's event handler. Only to be
thrown out. Message delivery filters are more efficient message-filtering
mechanism.

Message delivery filter -- is a predicate which gets the message instance and
returns `true` if message is allowed to be delivered to corresponding recipient
or `false` if message must be thrown out.

For the example above a message delivery filter can be set as:

\code
void timer_thread_stats_listener::so_define_agent()
{
    so_set_delivery_filter(
        so_environment().stats_controller().mbox(),
        []( const so_5::rt::stats::messages::quantity< std::size_t > & msg ) {
            return so_5::rt::stats::prefixes::timer_thread() == msg.m_prefix;
        } );
    ...
}
\endcode

In v.5.5.5 only ordinary agents can set delivery filters. And delivery filters
must be represented as lambda-function (or functional object) with just one
argument -- constant reference to message instance. Delivery filter
lambda-function must return `bool`.

An agent stores all the delivery filters defined. All delivery filters are
destroyed automatically at the end of agent's lifetime. A delivery filter can
be explicitly dropped and destroyed by the `so_drop_delivery_filter()` method:

\code
void timer_thread_stats_listener::evt_some_action()
{
    so_drop_delivery_filter<
            so_5::rt::stats::messages::quantity< std::size_t >
        >( so_environment().stats_controller().mbox() );
    ...
}
\endcode

If there is need to change the delivery filter it is enough to call
`so_set_delivery_filter()` yet another time without the need of calling
`so_drop_delivery_filter()`:

\code
// Setting the filter first time.
so_set_delivery_filter( mbox, []( const some_message & msg ) {
        return ...; // some predicate
    } );

... // Some messages could sent to mbox here.

// Changing the filter by new one.
so_set_delivery_filter( mbox, []( const some_message & msg ) {
        return ... // another predicate
    } );

... // Some messages could sent to mbox here.
\endcode

Method `so_set_delivery_filter()` sets the reference to delivery filter to the
appropriate message mbox. That mbox now knows that before message delivery for
the corresponding recipient the delivery filter must be invoked to check the
necessity of message delivery. If delivery filter returns `false` message is
not stored to the recipient queue at all.

Since v.5.5.5 multi-producer multi-consumer mboxes work this way:

- there are lists of subscribers for every message type;
- every subscriber in that list can have an optional delivery filter;
- during message delivery mbox walks through that list and checks delivery
  filter for every subscriber. If there is a delivery filter and it returns
  `false` then the message is not delivered to that subscriber. If there is no
  delivery filter or delivery filter returns `true` then the message is stored
  to the subscriber's event queue.

Please note that delivery filter is applied to a message only once -- during
message dispatching inside mbox. When a message is stored in an agent's event
queue and then extracted for processing the message instance no more checked.
It can be very important if delivery filter is replaced by new one. For
example:

\code
void my_agent::evt_demo() 
{
    // All actions are performed inside an event handler. It means
    // that agent can handle all the messages sent only after finish
    // of that event handler.

    // MPMC-mbox is necessary.
    const auto mbox = so_environment().create_local_mbox();

    // Create event subscription for message.
    so_subscribe( mbox ).event( []( const current_temperature & evt ) {
        cout << "temperature is: " << evt.value() << endl;
    } );

    // Send the first message. Temperature is 10 degrees.
    so_5::send< current_temperature >( mbox, 10 );

    // Sets the first filter.
    so_set_delivery_filter( mbox, []( const current_temperature & evt ) {
        // Allows only messages with values outside of normal range [0,45].
        return !( 0 <= evt.value() && evt.value() <= 45 );
    } );

    // Send the second message. Temperature is -10 degress.
    so_5::send< current_temperatur >( mbox, -10 );

    // Replace the filter with another.
    so_set_delivery_filter( mbox, []( const current_temperature & evt ) {
        // Allows only messages with values in very narrow range.
        return ( 20 <= evt.value() && evt.value() <= 22 );
    } );

    // Send the third message. Temperature is 21 degrees.
    so_5::send< current_temperature >( mbox, 21 );
}
\endcode

In that case an agent will receive all three messages. The first one because
there was no any filter during send operation. The second one because it passes
the first delivery filter. And the third one because it passes the second
delivery filter.

This approach could be a surprise for a developer if the developer thinks that
delivery filter applies for messages which is already in agent queue. It is not
true.

<b>Important note.</b> <i>A delivery filter can be set only for MPMC-mbox. It is
impossible to set a delivery filter to a MPSC (direct) mbox.</i>

It is because main purpose of MPSC-mbox is to be very fast and efficient direct
channel to an agent. Any additional checking will slow down the work of
MPSC-mbox.

There are some demands to delivery filters implementations:

- delivery filter must be as fast as possible. It is because a delivery filter
  is called during message delivery operation and long working time can slow
  down part or the whole application significantly;
- delivery filter must be thread-safe. It is because it will be called on
  different working threads, probably at the same time;

In the ideal case a delivery filter must be represented by a small and light
side-effect free lambda-function (like in the example above).

The delivery filter setup and event subscription is a different, not tightly
related operations. It is possible to set up delivery filter without
subscribing to that message. It is also possible to subscribing to message
without setting a delivery filter up.

In the situation when a delivery filter is set but there are no subscriptions
to the message the message box stores only filter. The mbox knows that there
are no subscriptions to the message and will not call delivery filter during a
message dispatch. 

A delivery filter and subscription for a message can be handled independently.
For example:

\code
using namespace std;
using namespace so_5;
using namespace so_5::rt;

// Message to be filtered.
using msg_sample = tuple_as_message_t< mtag<0>, int, string >;

// A signal for doing second part of example.
struct msg_second_part : public signal_t {};

// A signal for finish the example.
struct msg_shutdown : public signal_t {};

// Main example agent.
// Ordinary agent is necessary because a delivery filter can be set
// only by ordinary agent.
class a_example_t : public agent_t
{
public :
	a_example_t( context_t ctx )
		:	agent_t( ctx )
		,	m_mbox( so_environment().create_local_mbox() )
	{}

	virtual void
	so_define_agent() override
	{
		so_subscribe_self()
			.event< msg_second_part >( &a_example_t::evt_second_part )
			.event< msg_shutdown >( [this] {
				so_deregister_agent_coop_normally();
			} );
	}

	virtual void
	so_evt_start() override
	{
		// Subscribe for the message. Without filter.
		so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
		  cout << "[first]: " << get<0>(evt) << "-" << get<1>(evt) << endl;
		} );
		// Sending several messages...
		// All of them will be stored to the agent's queue and handled.
		send< msg_sample >( m_mbox, 0, "only-subscription" );
		send< msg_sample >( m_mbox, 1, "only-subscription" );

		// Setting a delivery filter for message.
		so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
		  return 1 == get<0>(evt);
		} );
		// Sending several messages...
		// Only one will be stored to the agent's queue and handled.
		send< msg_sample >( m_mbox, 0, "subscription-and-filter" );
		send< msg_sample >( m_mbox, 1, "subscription-and-filter" );

		// Take time for processing already queued messages.
		send_to_agent< msg_second_part >( *this );
	}

	void
	evt_second_part()
	{
		// Drop the subscription.
		so_drop_subscription< msg_sample >( m_mbox );
		// Sending several messages...
		// No one of them will be stored to the agent's queue nor handled.
		send< msg_sample >( m_mbox, 0, "only-filter" );
		send< msg_sample >( m_mbox, 1, "only-filter" );

		// Subscribe for the message again.
		so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
		  cout << "[second]: " << get<0>(evt) << "-" << get<1>(evt) << endl;
		} );
		// Sending several messages...
		// Only one will be stored to the agent's queue and handled.
		send< msg_sample >( m_mbox, 0, "subscription-and-filter-2" );
		send< msg_sample >( m_mbox, 1, "subscription-and-filter-2" );

		// Changing the filter to new one.
		so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
		  return 0 == get<0>(evt);
		} );
		// Sending several messages...
		// Only one will be stored to the agent's queue and handled.
		send< msg_sample >( m_mbox, 0, "subscription-and-filter-3" );
		send< msg_sample >( m_mbox, 1, "subscription-and-filter-3" );

		// Dropping the filter.
		so_drop_delivery_filter< msg_sample >( m_mbox );
		// Sending several messages...
		// All of them will be stored to the agent's queue and handled.
		send< msg_sample >( m_mbox, 0, "only-subscription-2" );
		send< msg_sample >( m_mbox, 1, "only-subscription-2" );

		// Example could be finished.
		send_to_agent< msg_shutdown >( *this );
	}

private :
	// Separate MPMC-mbox is necessary for delivery filter.
	const mbox_t m_mbox;
};
\endcode

*/

// vim:ft=cpp

